Implement parallel compilation
authorAlex Crichton <alex@alexcrichton.com>
Sun, 29 Jun 2014 04:30:44 +0000 (21:30 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Mon, 30 Jun 2014 23:06:00 +0000 (16:06 -0700)
This commit implements the -j flag in the `cargo_rustc` module, using the
primitives from the previous commits. The default parllelism is `os::num_cpus()`

This change also brings proper freshness propagation instead of the hokey logic
of once something is compiled, "compile everything to the right as well".

src/cargo/lib.rs
src/cargo/ops/cargo_rustc.rs
src/cargo/util/dependency_queue.rs
src/cargo/util/errors.rs

index 32a45e48b0f7174a43e68bc5533687dcd6b0cb07..314f9d0adf44a8b183c0bda5cb6eecfe1895aa3f 100644 (file)
@@ -1,7 +1,7 @@
 #![crate_id="cargo"]
 #![crate_type="rlib"]
 
-#![feature(macro_rules,phase)]
+#![feature(macro_rules, phase)]
 
 extern crate debug;
 extern crate term;
index 9b4489d79d69b124c9d3e1f1e968907b1bb48ab9..00798671bec7fe6d9a9f6126f3a2f32e7f8a52d9 100644 (file)
@@ -1,14 +1,15 @@
-use std::os::args;
-use std::io;
+use std::hash::Hasher;
+use std::hash::sip::SipHasher;
 use std::io::{File, IoError};
+use std::io;
+use std::os::args;
 use std::str;
-use std::hash::sip::SipHasher;
-use std::hash::Hasher;
+use term::color::YELLOW;
 
 use core::{Package, PackageSet, Target};
 use util;
 use util::{CargoResult, ChainError, ProcessBuilder, internal, human, CargoError};
-use util::{Config};
+use util::{Config, TaskPool, DependencyQueue, Fresh, Dirty, Freshness};
 
 type Args = Vec<String>;
 
@@ -17,10 +18,11 @@ struct Context<'a, 'b> {
     deps_dir: &'a Path,
     primary: bool,
     rustc_version: &'a str,
-    compiled_anything: bool,
     config: &'b mut Config<'b>
 }
 
+type Job = proc():Send -> CargoResult<()>;
+
 pub fn compile_targets<'a>(targets: &[&Target], pkg: &Package, deps: &PackageSet,
                            config: &'a mut Config<'a>) -> CargoResult<()> {
 
@@ -53,33 +55,38 @@ pub fn compile_targets<'a>(targets: &[&Target], pkg: &Package, deps: &PackageSet
         deps_dir: &deps_target_dir,
         primary: false,
         rustc_version: rustc_version.as_slice(),
-        compiled_anything: false,
         config: config
     };
 
-    // Traverse the dependencies in topological order
-    for dep in try!(topsort(deps)).iter() {
+    // Build up a list of pending jobs, each of which represent compiling a
+    // particular package. No actual work is executed as part of this, that's
+    // all done later as part of the `execute` function which will run
+    // everything in order with proper parallelism.
+    let mut jobs = Vec::new();
+    for dep in deps.iter() {
+        // Only compile lib targets for dependencies
         let targets = dep.get_targets().iter().filter(|target| {
-            // Only compile lib targets for dependencies
             target.is_lib() && target.get_profile().is_compile()
         }).collect::<Vec<&Target>>();
 
-        try!(compile(targets.as_slice(), dep, &mut cx));
+        jobs.push((dep,
+                   try!(compile(targets.as_slice(), dep, &mut cx))));
     }
 
     cx.primary = true;
     cx.dest = &target_dir;
+    jobs.push((pkg, try!(compile(targets, pkg, &mut cx))));
 
-    try!(compile(targets, pkg, &mut cx));
-
-    Ok(())
+    // Now that we've figured out everything that we're going to do, do it!
+    execute(cx.config, jobs)
 }
 
-fn compile(targets: &[&Target], pkg: &Package, cx: &mut Context) -> CargoResult<()> {
+fn compile(targets: &[&Target], pkg: &Package,
+           cx: &mut Context) -> CargoResult<(Freshness, Job)> {
     debug!("compile_pkg; pkg={}; targets={}", pkg, pkg.get_targets());
 
     if targets.is_empty() {
-        return Ok(());
+        return Ok((Fresh, proc() Ok(())))
     }
 
     // First check to see if this package is fresh.
@@ -96,35 +103,39 @@ fn compile(targets: &[&Target], pkg: &Package, cx: &mut Context) -> CargoResult<
     // TODO: Figure out how this works with targets
     let fingerprint_loc = cx.dest.join(format!(".{}.fingerprint",
                                                pkg.get_name()));
-    let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx, targets));
-    if !cx.compiled_anything && is_fresh {
-        try!(cx.config.shell().status("Fresh", pkg));
-        return Ok(())
-    }
+    let (is_fresh, fingerprint) = try!(is_fresh(pkg, &fingerprint_loc, cx,
+                                                targets));
 
-    // Alright, so this package is not fresh and we need to compile it. Start
-    // off by printing a nice helpful message and then run the custom build
-    // command if one is present.
-    try!(cx.config.shell().status("Compiling", pkg));
+    let mut cmds = Vec::new();
 
     // TODO: Should this be on the target or the package?
     match pkg.get_manifest().get_build() {
-        Some(cmd) => try!(compile_custom(pkg, cmd, cx)),
+        Some(cmd) => cmds.push(compile_custom(pkg, cmd, cx)),
         None => {}
     }
 
     // After the custom command has run, execute rustc for all targets of our
     // package.
     for &target in targets.iter() {
-        try!(rustc(&pkg.get_root(), target, cx));
+        cmds.push(rustc(&pkg.get_root(), target, cx));
     }
 
-    // Now that everything has successfully compiled, write our new fingerprint
-    // to the relevant location to prevent recompilations in the future.
-    try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice()));
-    cx.compiled_anything = true;
+    cmds.push(proc() {
+        // If this job runs, then everything has successfully compiled, so write
+        // our new fingerprint to the relevant location to prevent
+        // recompilations in the future.
+        try!(File::create(&fingerprint_loc).write_str(fingerprint.as_slice()));
+        Ok(())
+    });
 
-    Ok(())
+    // TODO: this job itself may internally be parallel, but we're hiding that
+    //       currently. How to expose the parallelism among a single target?
+    Ok((if is_fresh {Fresh} else {Dirty}, proc() {
+        for cmd in cmds.move_iter() {
+            try!(cmd());
+        }
+        Ok(())
+    }))
 }
 
 fn is_fresh(dep: &Package, loc: &Path,
@@ -163,7 +174,8 @@ fn mk_target(target: &Path) -> Result<(), IoError> {
     io::fs::mkdir_recursive(target, io::UserRWX)
 }
 
-fn compile_custom(pkg: &Package, cmd: &str, cx: &Context) -> CargoResult<()> {
+fn compile_custom(pkg: &Package, cmd: &str,
+                  cx: &Context) -> Job {
     // FIXME: this needs to be smarter about splitting
     let mut cmd = cmd.split(' ');
     let mut p = util::process(cmd.next().unwrap())
@@ -174,10 +186,11 @@ fn compile_custom(pkg: &Package, cmd: &str, cx: &Context) -> CargoResult<()> {
     for arg in cmd {
         p = p.arg(arg);
     }
-    p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human())
+    proc() p.exec_with_output().map(|_| ()).map_err(|e| e.mark_human())
 }
 
-fn rustc(root: &Path, target: &Target, cx: &Context) -> CargoResult<()> {
+fn rustc(root: &Path, target: &Target,
+         cx: &Context) -> Job {
 
     let crate_types = target.rustc_crate_types();
 
@@ -185,15 +198,18 @@ fn rustc(root: &Path, target: &Target, cx: &Context) -> CargoResult<()> {
          root.display(), target, crate_types, cx.dest.display(),
          cx.deps_dir.display(), cx.primary);
 
+    let primary = cx.primary;
     let rustc = prepare_rustc(root, target, crate_types, cx);
 
-    try!(if cx.primary {
-        rustc.exec().map_err(|err| human(err.to_str()))
-    } else {
-        rustc.exec_with_output().and(Ok(())).map_err(|err| human(err.to_str()))
-    });
-
-    Ok(())
+    proc() {
+        if primary {
+            rustc.exec().map_err(|err| human(err.to_str()))
+        } else {
+            rustc.exec_with_output().and(Ok(())).map_err(|err| {
+                human(err.to_str())
+            })
+        }
+    }
 }
 
 fn prepare_rustc(root: &Path, target: &Target, crate_types: Vec<&str>,
@@ -237,9 +253,56 @@ fn build_deps_args(dst: &mut Args, cx: &Context) {
     dst.push(cx.deps_dir.display().to_str());
 }
 
-fn topsort(deps: &PackageSet) -> CargoResult<PackageSet> {
-    match deps.sort() {
-        Some(deps) => Ok(deps),
-        None => return Err(internal("circular dependency detected"))
+/// Execute all jobs necessary to build the dependency graph.
+///
+/// This function will spawn off `config.jobs()` workers to build all of the
+/// necessary dependencies, in order. Freshness is propagated as far as possible
+/// along each dependency chain.
+fn execute(config: &mut Config,
+           jobs: Vec<(&Package, (Freshness, Job))>) -> CargoResult<()> {
+    let pool = TaskPool::new(config.jobs());
+    let (tx, rx) = channel();
+    let mut queue = DependencyQueue::new();
+    for (pkg, (fresh, job)) in jobs.move_iter() {
+        queue.enqueue(pkg, fresh, (pkg, job));
+    }
+
+    // Iteratively execute the dependency graph. Each turn of this loop will
+    // schedule as much work as possible and then wait for one job to finish,
+    // possibly scheduling more work afterwards.
+    let mut active = 0i;
+    while queue.len() > 0 {
+        loop {
+            match queue.dequeue() {
+                Some((name, Fresh, (pkg, _))) => {
+                    try!(config.shell().status("Fresh", pkg));
+                    tx.send((name, Fresh, Ok(())));
+                }
+                Some((name, Dirty, (pkg, job))) => {
+                    try!(config.shell().status("Compiling", pkg));
+                    let my_tx = tx.clone();
+                    pool.execute(proc() my_tx.send((name, Dirty, job())));
+                }
+                None => break,
+            }
+        }
+
+        // Now that all possible work has been scheduled, wait for a piece of
+        // work to finish. If any package fails to build then we stop scheduling
+        // work as quickly as possibly.
+        active -= 1;
+        match rx.recv() {
+            (name, fresh, Ok(())) => queue.finish(&name, fresh),
+            (_, _, Err(e)) => {
+                if active > 0 && config.jobs() > 1 {
+                    try!(config.shell().say("Build failed, waiting for other \
+                                             jobs to finish...", YELLOW));
+                    for _ in rx.iter() {}
+                }
+                return Err(e)
+            }
+        }
     }
+
+    Ok(())
 }
index b6761fc1bc24c8bd0f23f2ae72c9aa711a781994..689195b52c694f9f32dbecbefcf15a362c16fd48 100644 (file)
@@ -62,12 +62,17 @@ impl<T> DependencyQueue<T> {
     /// It is assumed that any dependencies of this package will eventually also
     /// be added to the dependency queue.
     pub fn enqueue(&mut self, pkg: &Package, fresh: Freshness, data: T) {
+        // ignore self-deps
+        if self.pkgs.contains_key(&pkg.get_name().to_str()) { return }
+
         if fresh == Dirty {
             self.dirty.insert(pkg.get_name().to_str());
         }
 
         let mut my_dependencies = HashSet::new();
         for dep in pkg.get_dependencies().iter() {
+            if dep.get_name() == pkg.get_name() { continue }
+
             let name = dep.get_name().to_str();
             assert!(my_dependencies.insert(name.clone()));
             let rev = self.reverse_dep_map.find_or_insert(name, HashSet::new());
index e67f9c0cf64ded595bd5e528a92eace02bbfc996..3d9953b04aaa5fe29622c284573e38facdad5fb8 100644 (file)
@@ -62,7 +62,7 @@ macro_rules! from_error (
     }
 )
 
-impl Show for Box<CargoError> {
+impl Show for Box<CargoError + Send> {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
         try!(write!(f, "{}", self.description()));
         Ok(())